Glueを使ってRDSからPinpointのセグメント情報を抽出してみた
こんにちは。たかやまです。
RDS(SQL Server)のテーブル情報をPinpointのセグメントにインポートしたいケースがあり、今回はGlueを使ってデータ抽出を行い、Pinpointのセグメントファイルのデータ形式に変換してS3に出力してみたいと思います。
今回の検証環境のサンプルコードはこちらです。(Glue ジョブは含まず)
やりたいこと
SQL ServerのUsersテーブルをPinpointにインポートできるセグメント情報に変換してS3に格納
やってみた
SQL Serverの設定
SQL Server Express Editionを用意して、以下のコマンドで検証用のテーブルを事前に作成しています。
CREATE DATABASE Test; GO USE Test; GO CREATE TABLE Users ( UserID CHAR(5) PRIMARY KEY, FirstName VARCHAR(10) NOT NULL, LastName VARCHAR(10) NOT NULL, Telephone VARCHAR(20), email VARCHAR(256) ); GO INSERT INTO Users (UserID, FirstName, LastName, Telephone, email) VALUES (1, 'example', 'user1', '080-xxxx-0001', '[email protected]'), (2, 'example', 'user2', '080-xxxx-0002', '[email protected]'), (3, 'example', 'user3', '' , '[email protected]'), (4, 'example', 'user4', '080-xxxx-0004', ''), (5, 'example', 'user5', '', '') GO
Glue接続の設定
RDSに接続するためのGlueの接続
を設定します。
VPC内のRDSに接続するために以下の設定をしている必要があります。
- Glue用自己参照型セキュリティグループの用意
- S3への接続経路
セキュリティグループ作成
Glueの接続を許可するために、Glueにアタッチするセキュリティグループは自己参照のインバウンドソースが必要になります。
S3エンドポイント作成
また、GlueのスクリプトファイルなどをS3に保存するためにS3へのインターネット経由の経路またはVPCエンドポイントが必要になります。
ここではGateway型のS3エンドポイントを用意しています。
Glue 接続の作成
追記:RDSのパスワードをSecrets Managerで管理している場合は、Glue接続の作成をGlue StudioコンソールのConnectorsから実施してください。
左ペインの接続
-> 接続の追加
を選択します。
接続プロパティで適当な一意の接続名、接続タイプJDBC
を設定します。
次にデータストアへのアクセスを設定します。
項目 | 内容 |
---|---|
JDBC URL | SQL ServerのJDBC URL : sqlserver://host:port;databaseName=db_name host : RDSエンドポイント port : RDSの接続ポート db_name : 接続先のデータベース名 |
ユーザー名 | データベースにアクセスできるユーザ |
パスワード | ユーザのパスワード |
VPC | 接続先のRDSを含んでいるVPC |
サブネット | 接続先のRDSを含んでいるサブネット |
セキュリティグループ | 自己参照インバウンドポートが空いているセキュリティーグループ RDSに接続可能なセキュリティグループ |
設定に問題がなければ完了
を選択します。
設定した内容で接続可能か接続のテスト
を実施します。
RDSに接続できるGlueのロールを選択します。
作成していない場合には、リンクの手順を参考に作成してください。
ステップ 2: AWS Glue 用にIAM ロールを作成する - AWS Glue
設定に問題がなければ、正常にインスタンスに接続されました。
と表示されることが確認できます。(数分かかります。)
S3まわりで問題がある場合には以下のようなエラーメッセージが出力されます。
Reason: could not find S3 endpoint or NAT gateway for subnetId subnet-xxxx in Vpc vpc-xxxx
S3のエラーはわかりやすいですが、JDBC URLなどRDS接続周りで接続できない場合には以下のようなエラーメッセージが出るので参考までに
Unable to resolve any valid connection.
Glue クローラの設定
Glue クローラの作成
SQL Serverのテーブル情報抽出のため、Glue クローラを設定します。
適当なクローラーの名前を設定します。
クローラーのソースはSQL Serverを利用するため、Data stores
を選択します。
データベースの追加ではJDBC
を選択して、接続の項目にはさきほど作成したGlue接続
を選びます。
インクルードパスには%
を指定し、接続先データベースのテーブルをすべて取得します。
接続のテストで利用したロールを選択します。
クローラの実行スケジュールを選択します。
ここではオンデマンドでの実行を選択します。
クローラの出力先を設定します。
出力先のGlueデータベースが無い場合はデータベースの追加からデータベースを用意してください。
最後に設定を確認し問題がなければ、完了を選択します。
Glueテーブルの作成
作成されたクローラを実行します。
クローラーが問題なく完了すれば、Glue テーブルにSQL Serverのテーブル情報が追加されます。
Glue ジョブの設定
Glue Studioでジョブを作成していきます。作成したフローの全体像はこちらです。
Job Script
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.dynamicframe import DynamicFrameCollection from awsglue.dynamicframe import DynamicFrame from awsglue import DynamicFrame import re # Script generated for node Custom transform def add_colum2(glueContext, dfc) -> DynamicFrameCollection: from pyspark.sql.functions import col, concat, lit, regexp_replace df = dfc.select(list(dfc.keys())[0]).toDF() df = df.withColumn("ChannelType", lit("SMS")) df = df.withColumn("Address", regexp_replace("Address", "-", "")) df = df.withColumn("Address", regexp_replace("Address", "^.", "")) df_add_colum = df.withColumn("Address", concat(lit("+81"), col("Address"))) output_df1 = DynamicFrame.fromDF(df_add_colum, glueContext, "output_df") return DynamicFrameCollection({"output_df1": output_df1}, glueContext) # Script generated for node Custom transform def MyTransform(glueContext, dfc) -> DynamicFrameCollection: selected = dfc.select(list(dfc.keys())[0]).toDF() reprep = selected.coalesce(1) results = DynamicFrame.fromDF(reprep, glueContext, "results") return DynamicFrameCollection({"results": results}, glueContext) def sparkUnion(glueContext, unionType, mapping, transformation_ctx) -> DynamicFrame: for alias, frame in mapping.items(): frame.toDF().createOrReplaceTempView(alias) result = spark.sql( "(select * from source1) UNION " + unionType + " (select * from source2)" ) return DynamicFrame.fromDF(result, glueContext, transformation_ctx) # Script generated for node Custom transform def add_colum1(glueContext, dfc) -> DynamicFrameCollection: from pyspark.sql.functions import lit df = dfc.select(list(dfc.keys())[0]).toDF() df_add_colum = df.withColumn("ChannelType", lit("EMAIL")) output_df1 = DynamicFrame.fromDF(df_add_colum, glueContext, "output_df") return DynamicFrameCollection({"output_df1": output_df1}, glueContext) args = getResolvedOptions(sys.argv, ["JOB_NAME"]) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) # Script generated for node SQL Server table SQLServertable_node1 = glueContext.create_dynamic_frame.from_catalog( database="mssql-s3-integration-dev-db", table_name="test_dbo_users", transformation_ctx="SQLServertable_node1", ) # Script generated for node ApplyMapping ApplyMapping_node2 = ApplyMapping.apply( frame=SQLServertable_node1, mappings=[("telephone", "string", "Address", "string")], transformation_ctx="ApplyMapping_node2", ) # Script generated for node Apply Mapping ApplyMapping_node1658129522250 = ApplyMapping.apply( frame=SQLServertable_node1, mappings=[("email", "string", "Address", "string")], transformation_ctx="ApplyMapping_node1658129522250", ) # Script generated for node Filter Filter_node1658143711633 = Filter.apply( frame=ApplyMapping_node2, f=lambda row: (bool(re.match("\S", row["Address"]))), transformation_ctx="Filter_node1658143711633", ) # Script generated for node Filter Filter_node1658143945724 = Filter.apply( frame=ApplyMapping_node1658129522250, f=lambda row: (bool(re.match("\S", row["Address"]))), transformation_ctx="Filter_node1658143945724", ) # Script generated for node Custom transform Customtransform_node1658140204230 = add_colum2( glueContext, DynamicFrameCollection( {"Filter_node1658143711633": Filter_node1658143711633}, glueContext ), ) # Script generated for node Custom transform Customtransform_node1658140211475 = add_colum1( glueContext, DynamicFrameCollection( {"Filter_node1658143945724": Filter_node1658143945724}, glueContext ), ) # Script generated for node Select From Collection SelectFromCollection_node1658140219909 = SelectFromCollection.apply( dfc=Customtransform_node1658140204230, key=list(Customtransform_node1658140204230.keys())[0], transformation_ctx="SelectFromCollection_node1658140219909", ) # Script generated for node Select From Collection SelectFromCollection_node1658140226309 = SelectFromCollection.apply( dfc=Customtransform_node1658140211475, key=list(Customtransform_node1658140211475.keys())[0], transformation_ctx="SelectFromCollection_node1658140226309", ) # Script generated for node Union Union_node1658134250028 = sparkUnion( glueContext, unionType="ALL", mapping={ "source1": SelectFromCollection_node1658140226309, "source2": SelectFromCollection_node1658140219909, }, transformation_ctx="Union_node1658134250028", ) # Script generated for node Custom transform Customtransform_node1658311524969 = MyTransform( glueContext, DynamicFrameCollection( {"Union_node1658134250028": Union_node1658134250028}, glueContext ), ) # Script generated for node Select From Collection SelectFromCollection_node1658311587979 = SelectFromCollection.apply( dfc=Customtransform_node1658311524969, key=list(Customtransform_node1658311524969.keys())[0], transformation_ctx="SelectFromCollection_node1658311587979", ) # Script generated for node Amazon S3 AmazonS3_node1658134594283 = glueContext.write_dynamic_frame.from_options( frame=SelectFromCollection_node1658311587979, connection_type="s3", format="csv", connection_options={ "path": "s3://mssql-s3-integration-dev-mssqls3integrationdevs30-q462kdal8dd1", "partitionKeys": [], }, transformation_ctx="AmazonS3_node1658134594283", ) job.commit()
項目ごとに説明していきます。
Data Source
には作成したSQL Serverのデータカタログを指定します。
Apply Mapping
で不要なカラムの削除とtelephone/emailカラムの分離、リネームを行っています。
レコードには値が入っていないものもあるので、Filter
を使い値の入っているレコードのみ抽出します。
Custom transform
でpinpointで必須項目のChannelTypeのカラムを追加していきます。
Custom transoformの出力はSelectFromCollection
でDynamicFrameに変換する必要があります。
add_colum2の処理ではカラム追加に合わせて電話番号情報をpinpointで扱うための国際公衆電気通信番号(E.164)に変換する処理も行っています。
Amazon Pinpoint での電話番号の検証 - Amazon Pinpoint
def add_colum2 (glueContext, dfc) -> DynamicFrameCollection: from pyspark.sql.functions import col, concat, lit, regexp_replace df = dfc.select(list(dfc.keys())[0]).toDF() df = df.withColumn("ChannelType", lit("SMS")) df = df.withColumn("Address", regexp_replace("Address", "-", "")) df = df.withColumn("Address", regexp_replace("Address", "^.", "")) df_add_colum = df.withColumn("Address", concat(lit("+81"), col("Address"))) output_df1 = DynamicFrame.fromDF(df_add_colum, glueContext, "output_df") return DynamicFrameCollection({"output_df1": output_df1}, glueContext)
Union
で分離したメールアドレスと電話のデータセットを結合します。
2つ目のCustom transform
ではGlueの出力をまとめます。
通常Glueは処理結果を複数のファイルに分割して出力します。
分割されるとpinpointへのインポートが大変になるので、coalesce
を使用して出力ファイルをひとつにまとめます。
より大きなファイルを出力するように AWS Glue ETL ジョブを設定する
def MyTransform (glueContext, dfc) -> DynamicFrameCollection: selected = dfc.select(list(dfc.keys())[0]).toDF() reprep = selected.coalesce(1) results = DynamicFrame.fromDF(reprep, glueContext, "results") return DynamicFrameCollection({"results": results}, glueContext)
Data taget
でS3バケットを選択します。
pinpointに取り込むように非圧縮のCSV形式で出力します。
ジョブ実行
作成したジョブを実行していきます。
ジョブプロパティは以下の通りです。実行のたびに全件データを抽出したいので今回Job bookmark
はDisableにしておきます。
ジョブのブックマークを使用した処理済みデータの追跡 - AWS Glue
Run
を選択して、ステータスがSucceededになれば実行完了です。
Data targetに指定したS3バケットにデータが格納されていることが確認できます。
ファイルにはCSV形式でフォーマットされたデータが格納されていることが確認できます。
電話番号がダブルクォーテーションに付与されてしまっていますが、pinpointへの取り込み上の問題はないので一旦このままで。。
Address,ChannelType [email protected],EMAIL [email protected],EMAIL [email protected],EMAIL "+8180xxxx0004",SMS "+8180xxxx0001",SMS "+8180xxxx0002",SMS
Pinpointへのインポート
出力したデータをPinpointのセグメント情報としてインポートしたいと思います。
無事、インポートされることが確認できました。
最後に
ほとんどコードを書かずにGlueを使ってやりたいことを実現できたと思います。
ただ、このブログを書いているときにGlue DataBrewでは電話番号のフォーマットを行う機能なども使えることを知ったので、今回やりたかったことはDataBrewの方が簡単にできそうな気がしました。
次はGlue Databrewでためしてみたいと思います。
以上、たかやまでした。